package com.data.dev.utils;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaDnsTopic.Dns;
import com.data.dev.common.javabean.kafkaDnsTopic.DnsMsg;
import com.data.dev.common.javabean.kafkaDnsTopic.MetaData;
import com.data.dev.common.javabean.kafkaDnsTopic.Question;
import com.data.dev.common.javabean.kafkaKelaiTopic.KelaiMsg;
import com.data.dev.common.javabean.kafkaKelaiTopic.KeleiMsgList;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
import com.data.dev.key.DnsMsgKey;
import com.data.dev.key.KelaiMsgKey;
import com.data.dev.key.MailMsgKey;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;


public class MsgToJavaBeanUtils  extends BaseBean implements Serializable {

    /**
     * json data from dns topic switch to java bean
     * {
     * 	"@timestamp":"2022-05-17T02:49:34.801Z",
     * 	"@metadata":{
     * 		"beat":"packetbeat",
     * 		"type":"_doc",
     * 		"version":"7.10.2"
     *        },
     * 	"ip":"10.0.0.10",
     * 	"dns":{
     * 		"question":{
     * 			"name":"sd.dnslog.cn",
     * 			"type":"AAAA"
     *        }
     *    },
     * 	"client_ip":"10.63.201.22",
     * 	"from":"165-15-ens3f1",
     * 	"type":"dns"
     * }
     */
    public static DnsMsg getDnsMsgBean(String jsonMsgFromKafkaDnsTopic){
        JsonUtils json = new JsonUtils(jsonMsgFromKafkaDnsTopic.replace("@",""));

        //初始化JAVA BEAN
        Dns dns = new Dns();
        MetaData metaData = new MetaData();
        Question question = new Question();
        DnsMsg dnsMsg  =  new DnsMsg();

        //持久化Question
        question.setName(String.valueOf(json.get(DnsMsgKey.DNS_QUESTION_NAME)));
        question.setType(String.valueOf(json.get(DnsMsgKey.DNS_QUESTION_TYPE)));

        //持久化DNS
        dns.setQuestion(question);

        //持久化MetaData
        metaData.setBeat(String.valueOf(json.get(DnsMsgKey.DNS_METADATA_BEAT)));
        metaData.setType(String.valueOf(json.get(DnsMsgKey.DNS_METADATA_TYPE)));
        metaData.setVersion(String.valueOf(json.get(DnsMsgKey.DNS_METADATA_VERSION)));


        //持久化DnsMsg
        dnsMsg.setTimestamp(String.valueOf(json.get(DnsMsgKey.DNS_TIMESTAMP)));
        dnsMsg.setMetaData(metaData);
        dnsMsg.setIp(String.valueOf(json.get(DnsMsgKey.DNS_IP)));
        dnsMsg.setDns(dns);
        dnsMsg.setClient_ip(String.valueOf(json.get(DnsMsgKey.DNS_CLIENT_IP)));
        dnsMsg.setFrom(String.valueOf(json.get(DnsMsgKey.DNS_FROM)));
        dnsMsg.setType(String.valueOf(json.get(DnsMsgKey.DNS_TYPE)));


        return dnsMsg;

    }



    /**
     * json data from dns topic switch to java bean
     *
     * {
     *   "user": "yangxuefei-lhb",
     *   "client_ip": "10.68.6.182",
     *   "source": "login",
     *   "loginname": "yangxuefei-lhb@sinosig.com",
     *   "IP": "10.8.148.58",
     *   "timestamp": "17:58:12",
     *   "@timestamp": "2022-04-20T09:58:13.647Z",
     *   "ip": "10.7.231.25",
     *   "clienttype": "POP3",
     *   "result": "success",
     *   "@version": "1"
     * }
     */
    public static MailMsg getMailMsgBean(String jsonMsgFromKafkaMailTopic){
        JsonUtils json = new JsonUtils(jsonMsgFromKafkaMailTopic);

        //初始化JAVA BEAN
        MailMsg mailMsg = new MailMsg();

        //持久化MailMsg
        mailMsg.setUser(String.valueOf(json.get(MailMsgKey.MAIL_USER)));
        mailMsg.setClient_ip(String.valueOf(json.get(MailMsgKey.MAIL_CLIENT_IP)));
        mailMsg.setSource(String.valueOf(json.get(MailMsgKey.MAIL_SOURCE)));
        mailMsg.setLoginName(String.valueOf(json.get(MailMsgKey.MAIL_LOGIN_NAME)));
        mailMsg.setMailSenderSourceIp(String.valueOf(json.get(MailMsgKey.MAIL_SENDER_IP)));
        mailMsg.setTimestamp_time(String.valueOf(json.get(MailMsgKey.MAIL_TIMESTAMP_TIME)));
        mailMsg.setTimestamp_datetime(String.valueOf(json.get(MailMsgKey.MAIL_TIMESTAMP_DATETIME)));
        mailMsg.setIp(String.valueOf(json.get(MailMsgKey.MAIL_TARGET_IP)));
        mailMsg.setClientType(String.valueOf(json.get(MailMsgKey.MAIL_CLIENT_TYPE)));
        mailMsg.setResult(String.valueOf(json.get(MailMsgKey.MAIL_RESULT)));
        mailMsg.setVersion(String.valueOf(json.get(MailMsgKey.MAIL_VERSION)));

        return mailMsg;

    }


    /**
     *
     * @param keleiMsg:原始kafka消息
     * @return keleiMsgList：将原始消息转换为List集合
     */
    public static KeleiMsgList getKeleiMsgListBean(String keleiMsg){
        List<String> kafkakeleiMsgList = Collections.singletonList(keleiMsg);
        //初始化JAVA BEAN
        KeleiMsgList  keleiMsgList = new KeleiMsgList();
        //持久化KeleiMsgList
        keleiMsgList.setKeleiMsgList(kafkakeleiMsgList);
        return keleiMsgList;

    }


    /**
     *     public static String FLOW_START_TIME = "flow_start_time";
     *     public static String APP = "app";
     *     public static String SERVER_TOTAL_PACKET = "server_total_packet";
     *     public static String CLIENT_TOTAL_BYTE = "client_total_byte";
     *     public static String CLIENT_TOTAL_PACKET = "client_total_packet";
     *     public static String NETLINK = "netlink";
     *     public static String CLIENT_IP_ADDR = "client_ip_addr";
     *     public static String TCP_STATUS = "tcp_status";
     *     public static String FLOW_END_TIME = "flow_end_time";
     *     public static String CLIENT_PORT = "client_port";
     *     public static String LOG_TYPE = "log_type";
     *     public static String PROTOCOL = "protocol";
     *     public static String SERVER_PORT = "server_port";
     *     public static String SERVER_IP_ADDR = "server_ip_addr";
     *     public static String SERVER_TOTAL_BYTE = "server_total_byte";
     *     public static String HOST_NAME = "host_name";
     *     public static String DIRECTION = "direction";
     */
    public static KelaiMsg getKelaiMsgBean(String keleiMsgJsonStr){
        JsonUtils json = new JsonUtils(keleiMsgJsonStr);

        //初始化JAVA BEAN
        KelaiMsg kelaiMsg = new KelaiMsg();
        //持久化keleiMsg
        kelaiMsg.setFlow_start_time(Long.parseLong(String.valueOf(json.get(KelaiMsgKey.FLOW_START_TIME)).replace("L","")));
        kelaiMsg.setApp(Integer.parseInt(String.valueOf(json.get(KelaiMsgKey.APP))));
        kelaiMsg.setServer_total_packet(Integer.parseInt(String.valueOf(json.get(KelaiMsgKey.SERVER_TOTAL_PACKET))));
        kelaiMsg.setClient_total_byte(json.getInt(KelaiMsgKey.CLIENT_TOTAL_BYTE));
        kelaiMsg.setClient_total_packet(json.getInt(KelaiMsgKey.CLIENT_TOTAL_PACKET));
        kelaiMsg.setNetlink(json.getInt(KelaiMsgKey.NETLINK));
        kelaiMsg.setClient_ip_addr(String.valueOf(json.get(KelaiMsgKey.CLIENT_IP_ADDR)));
        kelaiMsg.setTcp_status(json.getInt(KelaiMsgKey.TCP_STATUS));
        kelaiMsg.setFlow_end_time(Long.parseLong(String.valueOf(json.get(KelaiMsgKey.FLOW_END_TIME)).replace("L","")));
        kelaiMsg.setClient_port(json.getInt(KelaiMsgKey.CLIENT_PORT));
        kelaiMsg.setLog_type(String.valueOf(json.get(KelaiMsgKey.LOG_TYPE)));
        kelaiMsg.setProtocol(json.getInt(KelaiMsgKey.PROTOCOL));
        kelaiMsg.setServer_port(json.getInt(KelaiMsgKey.SERVER_PORT));
        kelaiMsg.setServer_ip_addr(String.valueOf(KelaiMsgKey.SERVER_IP_ADDR));
        kelaiMsg.setServer_total_byte(json.getInt(KelaiMsgKey.SERVER_TOTAL_BYTE));
        kelaiMsg.setHost_name(String.valueOf(json.get(KelaiMsgKey.HOST_NAME)));
        kelaiMsg.setDirection(json.getInt(KelaiMsgKey.DIRECTION));

        return kelaiMsg;
    }

}
